package com.tsfyun.scm.config.ws;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.tsfyun.common.base.util.StringUtils;
import com.tsfyun.scm.config.redis.RedisUtils;
import com.tsfyun.scm.dto.support.WsMessageDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
public class WsHandler implements WebSocketHandler, RedisReceivermsg {

    //存放ws session的map
    private static Map<Long,WebSocketSession> socketMap = new ConcurrentHashMap<>();

    //直接注入是空的，改从其他地方设置
    private static RedisTemplate redisTemplate;

    public void setRedisTemplate(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void receiveMessage(String channel,String message) {
        //接收的redis发布的字符串json会有转义字符
        message = message.replace("\\", "").replace("\"{", "{").replace("}\"", "}");
        if(Objects.equals(channel,WsConstant.USER_OFFLINE_TOPIC)) {
            JSONObject jsonObject = JSONObject.parseObject(message);
            String userId = jsonObject.getString("userId");
            String userName = jsonObject.getString("userName");
            offlineUser(userId,userName);
            return;
        }
        WsMessageDTO wsMessageDTO = JSONObject.parseObject(message,WsMessageDTO.class);
        Boolean isBroad = wsMessageDTO.getIsBroad();
        List<Long> receiveUserIds = Lists.newArrayList();
        if(Objects.equals(isBroad,Boolean.TRUE)) {
            receiveUserIds.addAll(socketMap.keySet());
            log.info("消息：{}是广播消息",message);
        } else {
            receiveUserIds = wsMessageDTO.getUserIds();
        }
        JSONObject messageJson = JSON.parseObject(message);
        messageJson.remove("isBroad");
        messageJson.remove("userIds");
        TextMessage receiveMessage = new TextMessage(messageJson.toJSONString());
        if(CollUtil.isNotEmpty(receiveUserIds)) {
            receiveUserIds.stream().forEach(userId->{
                boolean flag = sendMessageToUser(userId, receiveMessage);
                if(flag) {
                    log.info("服务端发送消息给{}成功",userId);
                }
            });
        }
    }

    public void offlineUser(String userId,String userName) {
        if(socketMap.containsKey(userId)) {
            socketMap.remove(userId);
            log.info("用户{}/{}已断开连接并从socket map中移除",userId,userName);
        }
    }

    @Override
    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
        //获取用户信息
        Long userId = (Long) webSocketSession.getAttributes().get("userId");
        String userName = StringUtils.null2EmptyWithTrim(webSocketSession.getAttributes().get("userName"));
        log.info("成功建立连接，用户{}/{}",userId,userName);
        if(Objects.isNull(socketMap.get(userId))) {
            socketMap.put(userId,webSocketSession);
            //并且通过redis发布和订阅广播给其他的的机器
        }
    }

    /**
     * 接收客户端的消息
     * @param webSocketSession
     * @param webSocketMessage
     * @throws Exception
     */
    @Override
    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage) throws Exception {
        Long userId = (Long) webSocketSession.getAttributes().get("userId");
        String userName = StringUtils.null2EmptyWithTrim(webSocketSession.getAttributes().get("userName"));
        String msgContent = webSocketMessage.getPayload().toString();
        log.info("接收到信息：{}，来自：{}/{}",msgContent,userId,userName);
    }

    @Override
    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
        if (webSocketSession.isOpen()) {
            webSocketSession.close();
        }
        log.error("ws连接出错");
    }

    @Override
    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
        //获取用户信息
        Long userId = (Long) webSocketSession.getAttributes().get("userId");
        String userName = StringUtils.null2EmptyWithTrim(webSocketSession.getAttributes().get("userName"));
        if(Objects.nonNull(socketMap.get(userId))) {
            socketMap.remove(userId);
            //并且通过redis发布和订阅广播给其他的的机器
        } else {
            JSONObject message = new JSONObject().fluentPut("userId",userId).fluentPut("userName",userName);
            redisTemplate.convertAndSend(WsConstant.USER_OFFLINE_TOPIC,message.toJSONString());
        }
        log.info("用户：{}/{}，连接已关闭",userId,userName);
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 发送信息给指定用户
     * @param userId
     * @param message
     * @return
     */
    public boolean sendMessageToUser(Long userId, TextMessage message) {
        WebSocketSession session = socketMap.get(userId);
        if(session == null) {
            return false;
        }
        if (!session.isOpen()) {
            log.info("客户端{}已关闭，不发送",userId);
            return false;
        }
        try {
            log.info("正在发送消息给{}",userId);
            session.sendMessage(message);
        } catch (IOException e) {
            log.error(String.format("发送消息给%s异常",userId),e);
        }
        return true;
    }


}
